package com.linkedin.dagli.objectio.avro;

import com.linkedin.dagli.objectio.ObjectIterator;
import com.linkedin.dagli.objectio.ObjectReader;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;


/**
 * {@link ObjectReader} that reads records from Avro file(s).
 *
 * Please note that this class supports a "count" metadata file in the Avro files.  If present, this field will be
 * interpreted as the number of records in the file; this can substantially speed up the call to
 * {@link AvroReader#size64}.
 */
public class AvroReader<T> implements ObjectReader<T> {
  private static final String AVRO_EXTENSION = ".avro";
  private static final String COUNT_FIELD = "count"; // Avro metadata field optionally used to store the record count

  private final List<Path> _paths;
  private long _count = -1; // the cached number of records available, or -1 meaning "not yet known"
  private final DatumReader<T> _datumReader;

  /**
   * <p>
   * Create a new instance that will read records of the provided record type from the given paths.  This type may be
   * a specific type (e.g. as generated by the Avro-Java schema code generator) or a GenericRecord (for reading
   * arbitrary Avro records as GenericRecord instances).
   * </p><p>
   * All Avro files are collectively treated as one contiguous collection of records; the order of the records will
   * match the order of the paths as passed to this method.
   * </p><p>
   * Each path may be either an Avro file or a directory contain *.avro files in its (sub)directories.  Files with a
   * specified directory will be ordered by their paths.
   * </p><p>
   * If no paths are given, the AvroReader will be "empty", containing no records.  This is normally useful only in
   * special circumstances.
   * </p>
   *
   * @throws IllegalArgumentException if no Avro files are provided (either directly or within provided directories)
   *                                  in a provided non-empty list of paths.  Not thrown is paths.length == 0.
   * @param avroType the type of the Avro record (e.g. the class created by the Avro-Java schema code generator)
   * @param readPaths zero or more paths that are either Avro files or directories containing Avro files
   */
  public AvroReader(Class<T> avroType, Path... readPaths) {
    _paths = Arrays.stream(readPaths).flatMap(path -> {
      try {
        return Files.isDirectory(path) ? Files.find(path, 100,
            (p, a) -> p.toString().endsWith(AVRO_EXTENSION) && a.isRegularFile()).sorted() : Stream.of(path);
      } catch (IOException e) {
        throw new UncheckedIOException(e);
      }
    }).collect(Collectors.toList());
    if (_paths.isEmpty() && readPaths.length > 0) {
      throw new IllegalArgumentException("Paths to directories were provided, but no .avro files were found!");
    }

    _datumReader =
        avroType.equals(GenericRecord.class) ? new GenericDatumReader<>() : new SpecificDatumReader<T>(avroType);
  }

  @Override
  public long size64() {
    return size64(false, 0);
  }

  /**
   * Calculates the number of records in this reader.
   *
   * @param useCountMetadataOnly if true, only the "count" metadata field is used to compute the result.  Files that do
   *                             not have this field will be taken to contain "defaultIfNoMetadata" records.  This is
   *                             useful for getting a cheap approximation of the number of records present.  If false,
   *                             the true number of records across all files will be returned (this may be substantially
   *                             more expensive).
   * @param defaultIfNoMetadata if useCountMetadataOnly is true, files that do not have a "count" metadata field will be
   *                            assumed to contain this many records each.  If useCountMetadataOnly is false, this value
   *                            has no effect.
   * @return an approximation of the number of records or the exact number of records, depending on the value of
   *         useCountMetadataOnly
   */
  long size64(boolean useCountMetadataOnly, long defaultIfNoMetadata) {
    if (_count < 0) {
      _count = _paths.stream().mapToLong(path -> {
        try {
          DataFileReader<T> dataFileReader = new DataFileReader<T>(path.toFile(), _datumReader);
          String countString = dataFileReader.getMetaString(COUNT_FIELD);
          if (countString != null) {
            return Long.parseLong(countString);
          } else if (useCountMetadataOnly) {
            // no metadata is available, so assume a size of defaultIfNoMetadata for this file
            return defaultIfNoMetadata;
          }

          long size = 0;
          while (dataFileReader.hasNext()) {
            size += dataFileReader.getBlockCount();
            dataFileReader.nextBlock();
          }
          return size;
        } catch (IOException e) {
          throw new UncheckedIOException(e);
        }
      }).sum();
    }
    return _count;
  }

  /**
   * Iterates over the records in one or more Avro files.
   *
   * @param <T> the type of Avro record to be iterated
   */
  private static class AvroIterator<T> implements ObjectIterator<T> {
    final Iterator<Path> _pathIterator;
    final DatumReader<T> _datumReader;
    DataFileReader<T> _dataFileReader = null;

    /**
     * Advances to the next file in the set of files backing this iterator.
     */
    private void nextDFR() {
      try {
        if (_dataFileReader != null) {
          _dataFileReader.close();
        }
        _dataFileReader = new DataFileReader<T>(_pathIterator.next().toFile(), _datumReader);
      } catch (IOException e) {
        throw new UncheckedIOException(e);
      }
    }

    /**
     * Creates a new instance.
     *
     * @param pathIterator the Avro files providing the records to be iterated by this instance
     * @param reader a {@link DatumReader} for reading the desired record type
     */
    AvroIterator(Iterator<Path> pathIterator, DatumReader<T> reader) {
      _pathIterator = pathIterator;
      _datumReader = reader;
      nextDFR();
    }

    @Override
    public boolean hasNext() {
      if (_dataFileReader.hasNext()) {
        return true;
      }
      while (_pathIterator.hasNext()) {
        nextDFR();
        if (_dataFileReader.hasNext()) {
          return true;
        }
      }
      // exhausted all paths
      return false;
    }

    @Override
    public T next() {
      // the call to hasNext() is required as it changes this object's state:
      if (!hasNext()) {
        throw new NoSuchElementException();
      }
      return _dataFileReader.next();
    }

    @Override
    public void close() {
      try {
        if (_dataFileReader != null) {
          _dataFileReader.close();
        }
      } catch (IOException e) {
        throw new UncheckedIOException(e);
      }
    }
  }

  @Override
  public ObjectIterator<T> iterator() {
    if (_paths.isEmpty()) {
      return ObjectIterator.empty();
    }

    return new AvroIterator<>(_paths.iterator(), _datumReader);
  }

  @Override
  public void close() { }
}
